/// The tests in this file are checking that bootstrapping of the database works correctly
/// They are testing edge cases that may accidentally occur with bugs - we wan't to make sure
/// the system can recover in light of these issues.
///
/// We may want to move these tests to another suite, as they aren't testing the statements like
/// the other tests are.
mod helpers;
mod parse;

use helpers::new_ds;
use serial_test::serial;
use surrealdb::err::Error;
use surrealdb::kvs::LockType::Optimistic;
use surrealdb::kvs::Transaction;
use surrealdb::kvs::TransactionType::Write;
use surrealdb::sql::statements::LiveStatement;
use surrealdb::sql::Uuid;

#[tokio::test]
#[serial]
async fn bootstrap_removes_unreachable_nodes() -> Result<(), Error> {
	// Create the datastore
	let dbs = new_ds().await.unwrap();

	let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
	// Introduce missing nodes (without heartbeats)
	let bad_node = uuid::Uuid::parse_str("9d8e16e4-9f6a-4704-8cf1-7cd55b937c5b").unwrap();
	tx.set_nd(bad_node).await.unwrap();

	// Introduce a valid chain of data to confirm it is not removed from a cleanup
	a_valid_notification(
		&mut tx,
		ValidNotificationState {
			timestamp: None,
			node_id: None,
			live_query_id: None,
			notification_id: None,
			namespace: "testns".to_string(),
			database: "testdb".to_string(),
			table: "testtb".to_string(),
		},
	)
	.await
	.unwrap();

	tx.commit().await.unwrap();

	// Bootstrap
	dbs.bootstrap().await.unwrap();

	// Declare a function that will assert
	async fn try_validate(tx: &mut Transaction, bad_node: &uuid::Uuid) -> Result<(), String> {
		let res = tx.scan_nd(1000).await.map_err(|e| e.to_string())?;
		tx.commit().await.map_err(|e| e.to_string())?;
		for node in &res {
			if node.name == bad_node.to_string() {
				return Err(format!("The node name was actually the bad node {:?}", bad_node));
			}
		}
		// {Node generated by bootstrap} + {valid node who's uuid we don't know}
		assert_eq!(res.len(), 2);
		if res.len() != 2 {
			return Err("Expected 2 nodes".to_string());
		}
		Ok(())
	}

	// Verify the incorrect node is deleted, but self and valid still exist
	let res = {
		let mut err = None;
		for _ in 0..5 {
			let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
			let res = try_validate(&mut tx, &bad_node).await;
			if res.is_ok() {
				return Ok(());
			}
			err = Some(res);
			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
		}
		err.unwrap()
	};
	res.unwrap();
	Ok(())
}

#[tokio::test]
#[serial]
async fn bootstrap_removes_unreachable_node_live_queries() -> Result<(), Error> {
	// Create the datastore
	let dbs = new_ds().await.unwrap();

	// Introduce an invalid node live query
	let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
	let valid_data = a_valid_notification(
		&mut tx,
		ValidNotificationState {
			timestamp: None,
			node_id: None,
			live_query_id: None,
			notification_id: None,
			namespace: "testns".to_string(),
			database: "testdb".to_string(),
			table: "testtb".to_string(),
		},
	)
	.await
	.unwrap();
	let bad_nd_lq_id = uuid::Uuid::parse_str("67b0f588-2b95-4b6e-87f3-73d0a49034be").unwrap();
	tx.putc_ndlq(
		valid_data.clone().node_id.unwrap().0,
		bad_nd_lq_id,
		&valid_data.namespace,
		&valid_data.database,
		&valid_data.table,
		None,
	)
	.await
	.unwrap();
	tx.commit().await.unwrap();

	// Bootstrap
	dbs.bootstrap().await.unwrap();

	// Verify node live query is deleted
	let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
	let res = tx.scan_ndlq(valid_data.node_id.as_ref().unwrap(), 1000).await.unwrap();
	tx.commit().await.unwrap();
	assert_eq!(res.len(), 1, "We expect the node to be available");
	let tested_entry = res.first().unwrap();
	assert_eq!(tested_entry.lq, valid_data.live_query_id.unwrap());

	Ok(())
}

#[tokio::test]
#[serial]
async fn bootstrap_removes_unreachable_table_live_queries() -> Result<(), Error> {
	// Create the datastore
	let dbs = new_ds().await.unwrap();

	// Introduce an invalid table live query
	let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();
	let valid_data = a_valid_notification(
		&mut tx,
		ValidNotificationState {
			timestamp: None,
			node_id: None,
			live_query_id: None,
			notification_id: None,
			namespace: "testns".to_string(),
			database: "testdb".to_string(),
			table: "testtb".to_string(),
		},
	)
	.await
	.unwrap();
	let bad_tb_lq_id = uuid::Uuid::parse_str("97b8fbe4-a147-4420-95dc-97db3a46c491").unwrap();
	let mut live_stm = LiveStatement::default();
	live_stm.id = bad_tb_lq_id.into();
	tx.putc_tblq(&valid_data.namespace, &valid_data.database, &valid_data.table, live_stm, None)
		.await
		.unwrap();
	tx.commit().await.unwrap();

	// Bootstrap
	dbs.bootstrap().await.unwrap();

	// Verify invalid table live query is deleted
	let mut tx = dbs.transaction(Write, Optimistic).await.unwrap();

	let res = tx
		.scan_tblq(&valid_data.namespace, &valid_data.database, &valid_data.table, 1000)
		.await
		.unwrap();
	tx.commit().await.unwrap();

	assert_eq!(res.len(), 1, "Expected 1 table live query: {:?}", res);
	let tested_entry = res.first().unwrap();
	assert_eq!(tested_entry.lq, valid_data.live_query_id.unwrap());
	Ok(())
}

#[tokio::test]
#[serial]
async fn bootstrap_removes_unreachable_live_query_notifications() -> Result<(), Error> {
	Ok(())
}

/// ValidBootstrapState is a representation of a chain of information that bootstrap is concerned
/// with. It is used for two reasons
/// - sometimes we want to detect invalid data that has a valid path (notification without a live query).
/// - sometimes we want to detect existing valid data is not deleted
#[derive(Debug, Clone)]
struct ValidNotificationState {
	pub timestamp: Option<u64>,
	pub node_id: Option<Uuid>,
	pub live_query_id: Option<Uuid>,
	pub notification_id: Option<Uuid>,
	pub namespace: String,
	pub database: String,
	pub table: String,
}

/// Create a chain of valid state that bootstrapping should not remove.
/// As a general rule, there is no need to override the system defaults since this code is to place generic data.
/// If you see these IDs, it is because you captured this entry.
/// So its ok to share ID between tests
async fn a_valid_notification(
	tx: &mut Transaction,
	args: ValidNotificationState,
) -> Result<ValidNotificationState, Error> {
	let now = tx.clock().await;
	let default_node_id =
		Uuid::from(uuid::Uuid::parse_str("123e9d92-c975-4daf-8080-3082e83cfa9b").unwrap());
	let default_lq_id =
		Uuid::from(uuid::Uuid::parse_str("ca02c2d0-31dd-4bf0-ada4-ee02b1191e0a").unwrap());
	let default_not_id =
		Uuid::from(uuid::Uuid::parse_str("c952cf7d-b503-4370-802e-cd2404f2160d").unwrap());
	let entry = ValidNotificationState {
		timestamp: Some(args.timestamp.unwrap_or(now.value)),
		node_id: Some(args.node_id.unwrap_or(default_node_id)),
		live_query_id: Some(args.live_query_id.unwrap_or(default_lq_id)),
		notification_id: Some(args.notification_id.unwrap_or(default_not_id)),
		..args
	};
	let mut live_stm = LiveStatement::default();
	live_stm.id = entry.live_query_id.unwrap();
	live_stm.node = entry.node_id.unwrap();

	// Create heartbeat
	tx.set_hb(entry.timestamp.unwrap().into(), entry.node_id.unwrap().0).await?;
	// Create cluster node entry
	tx.set_nd(entry.node_id.unwrap().0).await?;
	// Create node live query registration
	tx.putc_ndlq(
		entry.node_id.unwrap().0,
		entry.live_query_id.unwrap().0,
		&entry.namespace,
		&entry.database,
		&entry.table,
		None,
	)
	.await?;
	// Create table live query registration
	tx.putc_tblq(&entry.namespace, &entry.database, &entry.table, live_stm, None).await?;
	// TODO Create notification
	// tx.putc_tbnt(
	// ).await?;
	Ok(entry)
}
